#!/usr/bin/groovy
@Library('test-shared-library') _
import groovy.json.JsonSlurperClassic
import java.nio.file.Files
import java.nio.file.Paths

properties(
        [
                pipelineTriggers([cron('H 16 * * *')]),
                buildDiscarder(logRotator(numToKeepStr: '30'))
        ]
)

def readPropertiesFile(file) {
    def properties = [:]
    readFile(file).split("\n").each { line ->
        if (!line.startsWith("#")) {
            def splits = line.split("=")
            if (splits.length == 1) {
                properties[splits[0]] = ""
            } else {
                properties[splits[0]] = splits[1]
            }
        }
    }
    return properties
}

static String getSparklingVersion(props) {
    return "${props['version'].replace("-SNAPSHOT", "")}"
}

def shWithSWPyEnv(script) {
    def scriptWithPrefix =  """
        . ~/miniconda/etc/profile.d/conda.sh 
        conda activate sw_env_python3.6
        ${script}
        conda deactivate
    """
    return sh(script: scriptWithPrefix, returnStdout: true)
}

def startCluster(dbcVersion, sparkMajorVersion) {
    def script = """
        databricks clusters create --json '{
            "num_workers": 3,
            "cluster_name": "SparklingWaterTest-${sparkMajorVersion}",
            "spark_version": "${dbcVersion}",
            "spark_conf": {},
            "aws_attributes": {
                "first_on_demand": 4,
                "availability": "SPOT_WITH_FALLBACK",
                "zone_id": "us-east-1d",
                "instance_profile_arn": null,
                "spot_bid_price_percent": 100,
                "ebs_volume_type": "GENERAL_PURPOSE_SSD",
                "ebs_volume_count": 3,
                "ebs_volume_size": 100
            },
            "node_type_id": "m4.large",
            "ssh_public_keys": [],
            "custom_tags": {},
            "spark_env_vars": {},
            "autotermination_minutes": 120,
            "init_scripts": []
        }'
    """
    def jsonOutput = shWithSWPyEnv(script).trim()
    def jsonSlurper = new JsonSlurperClassic()
    def cfg = jsonSlurper.parseText(jsonOutput)
    return cfg["cluster_id"]
}

def waitForCluster(clusterId) {
    def ready = false
    while(!ready) {
        def json = shWithSWPyEnv("databricks clusters get --cluster-id ${clusterId}").trim()
        def jsonSlurper = new JsonSlurperClassic()
        def cfg = jsonSlurper.parseText(json)
        ready = cfg["state"] == "RUNNING"
        if (["ERROR", "UNKNOWN", "TERMINATED", "TERMINATING"].contains(cfg["state"])) {
            throw new RuntimeException("Cluster failed to start!")
        }
        Thread.sleep(10000)
    }
}

def deleteCluster(clusterId) {
    shWithSWPyEnv "databricks clusters delete --cluster-id ${clusterId}"
}

static def getDatabricksSparkVersions(props) {
    def sparkVersions = props["supportedSparkVersions"].split(" ").toList()
    def boundaryVersion = props["databricksTestSinceSpark"]
    def list = new ArrayList<String>()
    list.addAll(sparkVersions.subList(sparkVersions.indexOf(boundaryVersion), sparkVersions.size()))
    return list
}

def waitForArtifactToAttach(clusterId, artifact, type) {
    def ready = false
    while(!ready) {
        def rawJson = shWithSWPyEnv("databricks libraries cluster-status --cluster-id ${clusterId}").trim()
        def parsedJson = new JsonSlurperClassic().parseText(rawJson)
        def status = parsedJson.library_statuses.find { it.library[type] == "dbfs:/${clusterId}/${artifact}" }.status
        ready = status == "INSTALLED"
        if (["SKIPPED", "FAILED", "UNINSTALL_ON_RESTART"].contains(status)) {
            cleanEnvironment(clusterId)
            throw new RuntimeException("Failed to attach library ${artifact}")
        }
        Thread.sleep(1000)
    }
}

def installDependencies(clusterId, scalaArtifact, pythonArtifact, rArtifact) {
    // Scala
    shWithSWPyEnv """
        databricks fs cp assembly/build/libs/${scalaArtifact} dbfs:/${clusterId}/${scalaArtifact}
        databricks libraries install --cluster-id ${clusterId} --jar dbfs:/${clusterId}/${scalaArtifact}
    """
    waitForArtifactToAttach(clusterId, scalaArtifact, "jar")
    // Python
    shWithSWPyEnv """
        databricks fs cp py/build/pkg/dist/${pythonArtifact} dbfs:/${clusterId}/${pythonArtifact}
        databricks libraries install --cluster-id ${clusterId} --whl dbfs:/${clusterId}/${pythonArtifact}
    """
    waitForArtifactToAttach(clusterId, pythonArtifact, "whl")
    // R
    shWithSWPyEnv """
        databricks fs cp r/build/${rArtifact} dbfs:/FileStore/${clusterId}/${rArtifact}
        databricks fs cp h2o-3/h2o-r/h2o_*.99999.tar.gz dbfs:/FileStore/${clusterId}/h2o.tar.gz
    """
}

def test(workspaceDir, clusterId, language) {
    def extension = language
    if (language == "python") {
        extension = "py"
    }
    shWithSWPyEnv "databricks workspace import --language ${language} ci/databricksTests/test.${extension} ${workspaceDir}/test.${extension}"

    def rawJson = shWithSWPyEnv("""
        databricks runs submit --json '
            {
              "name": "Test ${language}",
              "existing_cluster_id": "${clusterId}",
              "notebook_task": {
                "notebook_path": "${workspaceDir}/test.${extension}"
              },
              "timeout_seconds": 600
            }
            '
        """).trim()
    def parsedJson = new JsonSlurperClassic().parseText(rawJson)
    def runId = parsedJson.run_id
    waitForJobToFinish(runId, language, clusterId)
}

def waitForJobToFinish(jobId, language, clusterId) {
    def done = false
    while(!done) {
        def rawJson = shWithSWPyEnv("databricks runs get --run-id ${jobId}").trim()
        def parsedJson = new JsonSlurperClassic().parseText(rawJson)
        def status = parsedJson.state.result_state
        if (["FAILED", "TIMEDOUT", "CANCELLED"].contains(status)) {
            cleanEnvironment(clusterId)
            throw new RuntimeException("${language} tests failed!\n\nDetails:\n\n${rawJson}")
        }
        done = status == "SUCCESS"
        Thread.sleep(1000)
    }
}

def deleteDependencies(clusterId) {
    shWithSWPyEnv """
        databricks fs rm -r dbfs:/${clusterId}
        databricks fs rm -r dbfs:/FileStore/${clusterId}
    """
}

String getH2OBranchMajorVersion() {
    def versionLine = readFile("h2o-3/gradle.properties").split("\n").find() { line -> line.startsWith('version') }
    return versionLine.split("=")[1]
}

String getH2OBranchMajorName() {
    return "bleeding_edge"
}

String getH2OBranchBuildVersion() {
    return "1-SNAPSHOT"
}

def buildH2O(props) {
    retryWithDelay(3, 60, {
        sh "git clone https://github.com/h2oai/h2o-3.git"
    })
    retryWithDelay(5, 1, {
        sh """
        cd h2o-3
        git checkout ${props["testH2OBranch"]}
        . /envs/h2o_env_python3.6/bin/activate
        unset CI
        ./gradlew build --parallel -x check -Duser.name=ec2-user
        ./gradlew publishToMavenLocal --parallel -Dmaven.repo.local=${env.WORKSPACE}/.m2 -Duser.name=ec2-user -Dhttp.socketTimeout=600000 -Dhttp.connectionTimeout=600000
        ./gradlew :h2o-r:buildPKG -Duser.name=ec2-user
        cd ..
        """
    })
}

def buildSparklingWater(props, sparkMajorVersion) {
    sh "./gradlew clean"
    sh """
        sed -i 's/^h2oMajorName=.*\$/h2oMajorName=${getH2OBranchMajorName()}/' gradle.properties
        sed -i 's/^h2oMajorVersion=.*\$/h2oMajorVersion=${getH2OBranchMajorVersion()}/' gradle.properties
        sed -i 's/^h2oBuild=.*\$/h2oBuild=${getH2OBranchBuildVersion()}/' gradle.properties
        """
    sh "H2O_HOME=${env.WORKSPACE}/h2o-3 ./gradlew dist -Pspark=$sparkMajorVersion -Dmaven.repo.local=${env.WORKSPACE}/.m2 -PbuildAgainstH2OBranch=${props["testH2OBranch"]} -Ph2oMajorVersion=${getH2OBranchMajorVersion()} -Ph2oMajorName=${getH2OBranchMajorName()} -Ph2oBuild=${getH2OBranchBuildVersion()}"
    dir("py/build/pkg") {
        shWithSWPyEnv "python setup.py bdist_wheel"
    }
}

def prepareRTestFile(clusterId, rArtifact) {
    def lines = readFile("ci/databricksTests/test_template.r").split("\n")
    def text = lines.collect {
        it.replaceAll("SUBST_H2O_PATH", "/dbfs/FileStore/${clusterId}/h2o.tar.gz")
                .replaceAll("SUBST_RSPARKLING_PATH", "/dbfs/FileStore/${clusterId}/${rArtifact}")
    }.join("\n")
    writeFile file: "ci/databricksTests/test.r", text: text
}

def restartCluster(clusterId) {
    shWithSWPyEnv "databricks clusters restart --cluster-id ${clusterId}"
    waitForCluster(clusterId)
}

def cleanEnvironment(clusterId) {
    shWithSWPyEnv "databricks workspace rm -r /sw-tests-${clusterId}"
    deleteDependencies(clusterId)
    deleteCluster(clusterId)
}

node("regular") {
    cleanWs()
    checkout scm
    def commons = load 'ci/commons.groovy'
    def props = readPropertiesFile("gradle.properties")
    def version = getSparklingVersion(props)

    stage("Build H2O") {
        commons.withSparklingWaterDockerImage {
            buildH2O(props)
            stash name: "h2o", excludes: "h2o-3/h2o-py/h2o/**/*.pyc, h2o-3/h2o-py/h2o/**/h2o.jar", includes: "h2o-3/build/h2o.jar, h2o-3/h2o-dist/buildinfo.json, h2o-3/gradle.properties, .m2/**, h2o-3/h2o-py/h2o/**, h2o-3/h2o-r/h2o_*.99999.tar.gz"
        }
    }

    getDatabricksSparkVersions(props).each { sparkMajorVersion ->
        stage("Test DBC - Spark " + sparkMajorVersion) {
            commons.withSparklingWaterDockerImage {
                commons.withDatabricksCredentials {
                    sh "rm -rf h2o-3"
                    unstash "h2o"
                    def sparkSpecificProps = readPropertiesFile("gradle-spark${sparkMajorVersion}.properties")
                    def dbcVersion = sparkSpecificProps["databricksVersion"]
                    def scalaVersion = sparkSpecificProps["scalaBaseVersion"]
                    shWithSWPyEnv "pip install databricks-cli"
                    buildSparklingWater(props, sparkMajorVersion)
                    String pythonArtifact = "h2o_pysparkling_${sparkMajorVersion}-${version.split("-")[0]}.post1-py2.py3-none-any.whl"
                    String scalaArtifact = "sparkling-water-assembly_${scalaVersion}-${version}-${sparkMajorVersion}-all.jar"
                    String rArtifact = "rsparkling_${version}-${sparkMajorVersion}.tar.gz"
                    def clusterId = startCluster(dbcVersion, sparkMajorVersion)
                    waitForCluster(clusterId)
                    installDependencies(clusterId, scalaArtifact, pythonArtifact, rArtifact)
                    String workspaceDir = "/sw-tests-${clusterId}"
                    shWithSWPyEnv "databricks workspace mkdirs ${workspaceDir}"
                    prepareRTestFile(clusterId, rArtifact)
                    test(workspaceDir, clusterId, "r")
                    restartCluster(clusterId)
                    test(workspaceDir, clusterId, "scala")
                    restartCluster(clusterId)
                    test(workspaceDir, clusterId, "python")
                    cleanEnvironment(clusterId)
                }
            }
        }
    }
}
